# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import absolute_import, division, print_function
import re
from copy import copy

from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfNotHdfsMinicluster
from tests.common.test_dimensions import (
    add_exec_option_dimension,
    create_exec_option_dimension)

"""Run sizes (number of pages per run) in sorter.
Values:
  0: there is no limit on the size of an in-memory run. The sorter will allocate memory
    to fit the data until it encounters some memory limit.
  2: an in-memory run can be at most 2 pages. This is the smallest possible size of a run:
    at least 1 page for fix-len data and 1 page for var-len data.
  20: an in-memory run can be at most 20 pages.
Too small in-memory runs with var-len data can cause memory fragmentation, therefore
different memory or spilling limits are needed to trigger the same scenarios in some test
cases."""
MAX_SORT_RUN_SIZE = [0, 2, 20]


def split_result_rows(result):
  """Split result rows by tab to produce a list of lists. i.e.
     [[a1,a2], [b1, b2], [c1, c2]]"""
  return [row.split('\t') for row in result]


def transpose_results(result, map_fn=lambda x: x):
  """Given a query result (list of strings, each string represents a row), return a list
     of columns, where each column is a list of strings. Optionally, map_fn can be
     provided to be applied to every value, eg. to convert the strings to their
     underlying types."""

  split_result = split_result_rows(result)
  column_result = []
  for col in zip(*split_result):
    # col is the transposed result, i.e. a1, b1, c1
    # Apply map_fn to all elements
    column_result.append([map_fn(x) for x in col])

  return column_result


class TestQueryFullSort(ImpalaTestSuite):
  """Test class to do functional validation of sorting when data is spilled to disk.
  All tests under this class run with num_nodes=1."""

  @classmethod
  def get_workload(self):
    return 'tpch'

  @classmethod
  def add_test_dimensions(cls):
    super(TestQueryFullSort, cls).add_test_dimensions()
    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(cluster_sizes=[1]))
    add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)

    if cls.exploration_strategy() == 'core':
      cls.ImpalaTestMatrix.add_constraint(lambda v:
          v.get_value('table_format').file_format == 'parquet')

  def test_multiple_buffer_pool_limits(self, vector):
    """Using lineitem table forces the multi-phase sort with low buffer_pool_limit.
       This test takes about a minute."""
    query = """select l_comment, l_partkey, l_orderkey, l_suppkey, l_commitdate
            from lineitem order by l_comment limit 100000"""
    exec_option = copy(vector.get_value('exec_option'))
    exec_option['disable_outermost_topn'] = 1
    table_format = vector.get_value('table_format')

    """The first run should fit in memory, the second run is a 2-phase disk sort,
       and the third run is a multi-phase sort (i.e. with an intermediate merge)."""
    for buffer_pool_limit in ['-1', '300m', '130m']:
      exec_option['buffer_pool_limit'] = buffer_pool_limit
      query_result = self.execute_query(
        query, exec_option, table_format=table_format)
      result = transpose_results(query_result.data)
      assert(result[0] == sorted(result[0]))

  def test_multiple_sort_run_bytes_limits(self, vector):
    """Using lineitem table forces the multi-phase sort with low sort_run_bytes_limit.
       This test takes about a minute."""
    query = """select l_comment, l_partkey, l_orderkey, l_suppkey, l_commitdate
            from lineitem order by l_comment limit 100000"""
    exec_option = copy(vector.get_value('exec_option'))
    exec_option['disable_outermost_topn'] = 1
    table_format = vector.get_value('table_format')

    """The first sort run is given a privilege to ignore sort_run_bytes_limit, except
       when estimate hints that spill is inevitable. The lower sort_run_bytes_limit of
       a query is, the more sort runs are likely to be produced and spilled.
       Case 1 : 0 SpilledRuns, because all rows fit within the maximum reservation.
                sort_run_bytes_limit is not enforced.
       Case 2 : 4 or 5 SpilledRuns, because sort node estimates that spill is inevitable.
                So all runs are capped to 130m, including the first one."""
    # max_sort_run_size > 0 will spill more in Case 2.
    options = [('2g', '100m', '0'),
           ('400m', '130m', ('5' if exec_option['max_sort_run_size'] > 0 else '4'))]
    for (mem_limit, sort_run_bytes_limit, spilled_runs) in options:
      exec_option['mem_limit'] = mem_limit
      exec_option['sort_run_bytes_limit'] = sort_run_bytes_limit
      query_result = self.execute_query(
          query, exec_option, table_format=table_format)
      m = re.search(r'\s+\- SpilledRuns: .*', query_result.runtime_profile)
      assert "SpilledRuns: " + spilled_runs in m.group()
      result = transpose_results(query_result.data)
      assert(result[0] == sorted(result[0]))

  def test_multiple_mem_limits_full_output(self, vector):
    """ Exercise a range of memory limits, returning the full sorted input. """
    query = """select o_orderdate, o_custkey, o_comment
      from orders
      order by o_orderdate"""
    exec_option = copy(vector.get_value('exec_option'))
    table_format = vector.get_value('table_format')
    exec_option['default_spillable_buffer_size'] = '8M'

    # Minimum memory for different parts of the plan.
    buffered_plan_root_sink_reservation_mb = 16
    sort_reservation_mb = 48
    if table_format.file_format == 'parquet':
      scan_reservation_mb = 24
    else:
      scan_reservation_mb = 8
    total_reservation_mb = sort_reservation_mb + scan_reservation_mb \
                           + buffered_plan_root_sink_reservation_mb

    # The below memory value assume 8M pages.
    # Test with unlimited and minimum memory for all file formats.
    buffer_pool_limit_values = ['-1', '{0}M'.format(total_reservation_mb)]
    if self.exploration_strategy() == 'exhaustive' and \
        table_format.file_format == 'parquet':
      # Test some intermediate values for parquet on exhaustive.
      buffer_pool_limit_values += ['128M', '256M']
    for buffer_pool_limit in buffer_pool_limit_values:
      exec_option['buffer_pool_limit'] = buffer_pool_limit
      result = transpose_results(self.execute_query(
        query, exec_option, table_format=table_format).data)
      assert(result[0] == sorted(result[0]))

  def test_sort_join(self, vector):
    """With minimum memory limit this should be a 1-phase sort"""
    query = """select o1.o_orderdate, o2.o_custkey, o1.o_comment from orders o1 join
    orders o2 on (o1.o_orderkey = o2.o_orderkey) order by o1.o_orderdate limit 100000"""

    exec_option = copy(vector.get_value('exec_option'))
    exec_option['disable_outermost_topn'] = 1
    # With max_sort_run_size=2 (2 pages per run) the varlen data is more fragmented and
    # requires a higher limit to maintain "TotalMergesPerformed: 1" assertion.
    if exec_option['max_sort_run_size'] == 2:
      exec_option['mem_limit'] = "144m"
    else:
      exec_option['mem_limit'] = "134m"
    table_format = vector.get_value('table_format')

    query_result = self.execute_query(query, exec_option, table_format=table_format)
    assert "TotalMergesPerformed: 1" in query_result.runtime_profile
    result = transpose_results(query_result.data)
    assert(result[0] == sorted(result[0]))

  def test_sort_union(self, vector):
    query = """select o_orderdate, o_custkey, o_comment from (select * from orders union
    select * from orders union all select * from orders) as i
    order by o_orderdate limit 100000"""

    exec_option = copy(vector.get_value('exec_option'))
    exec_option['disable_outermost_topn'] = 1
    exec_option['mem_limit'] = "3000m"
    table_format = vector.get_value('table_format')

    result = transpose_results(self.execute_query(
      query, exec_option, table_format=table_format).data)
    assert(result[0] == sorted(result[0]))

  def test_pathological_input(self, vector):
    """ Regression test for stack overflow and poor performance on certain inputs where
    always selecting the middle element as a quicksort pivot caused poor performance. The
    trick is to concatenate two equal-size sorted inputs. If the middle element is always
    selected as the pivot (the old method), the sorter tends to get stuck selecting the
    minimum element as the pivot, which results in almost all of the tuples ending up
    in the right partition.
    """
    query = """select l_orderkey from (
      select * from lineitem limit 300000
      union all
      select * from lineitem limit 300000) t
    order by l_orderkey"""

    exec_option = copy(vector.get_value('exec_option'))
    exec_option['disable_outermost_topn'] = 1
    # Run with a single scanner thread so that the input doesn't get reordered.
    exec_option['num_scanner_threads'] = "1"
    table_format = vector.get_value('table_format')

    result = transpose_results(self.execute_query(
      query, exec_option, table_format=table_format).data)
    numeric_results = [int(val) for val in result[0]]
    assert(numeric_results == sorted(numeric_results))

  def test_spill_empty_strings(self, vector):
    """Test corner case of spilling sort with only empty strings. Spilling with var len
    slots typically means the sort must reorder blocks and convert pointers, but this case
    has to be handled differently because there are no var len blocks to point into."""

    query = """
    select empty_str, l_orderkey, l_partkey, l_suppkey,
        l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax
    from (select substr(l_comment, 1000, 0) empty_str, * from lineitem) t
    order by empty_str, l_orderkey, l_partkey, l_suppkey, l_linenumber
    limit 100000
    """

    exec_option = copy(vector.get_value('exec_option'))
    exec_option['disable_outermost_topn'] = 1
    exec_option['buffer_pool_limit'] = "256m"
    table_format = vector.get_value('table_format')

    result = transpose_results(self.execute_query(
      query, exec_option, table_format=table_format).data)
    assert(result[0] == sorted(result[0]))

  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
  def test_sort_reservation_usage(self, vector):
    """Tests for sorter reservation usage.
    If max_sort_run_size > 0, the larger the run size, the sooner the sorter can give up
    memory to the next node."""
    if vector.get_value('exec_option')['max_sort_run_size'] == 2:
      # Increase buffer_limit to maintain such that query never spill.
      buffer_pool_limit = '27m'
    else:
      buffer_pool_limit = '14m'
    self.run_test_case('sort-reservation-usage-single-node', vector,
                       test_file_vars={'$BUFFER_POOL_LIMIT': buffer_pool_limit})


class TestRandomSort(ImpalaTestSuite):
  @classmethod
  def add_test_dimensions(cls):
    super(TestRandomSort, cls).add_test_dimensions()
    add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)

    if cls.exploration_strategy() == 'core':
      cls.ImpalaTestMatrix.add_constraint(lambda v:
          v.get_value('table_format').file_format == 'parquet')

  def test_order_by_random(self, vector):
    """Tests that 'order by random()' works as expected."""
    exec_option = copy(vector.get_value('exec_option'))
    # "order by random()" with different seeds should produce different orderings.
    seed_query = "select * from functional.alltypestiny order by random(%s)"
    results_seed0 = self.execute_query(seed_query % "0")
    results_seed1 = self.execute_query(seed_query % "1")
    assert results_seed0.data != results_seed1.data
    assert sorted(results_seed0.data) == sorted(results_seed1.data)

    # Include "random()" in the select list to check that it's sorted correctly.
    results = transpose_results(self.execute_query(
        "select random() as r from functional.alltypessmall order by r",
        exec_option).data, lambda x: float(x))
    assert(results[0] == sorted(results[0]))

    # Like above, but with a limit.
    results = transpose_results(self.execute_query(
        "select random() as r from functional.alltypes order by r limit 100").data,
        lambda x: float(x))
    assert(results == sorted(results))

    # "order by random()" inside an inline view.
    query = "select r from (select random() r from functional.alltypessmall) v order by r"
    results = transpose_results(self.execute_query(query, exec_option).data,
        lambda x: float(x))
    assert (results == sorted(results))

  def test_analytic_order_by_random(self, vector):
    """Tests that a window function over 'order by random()' works as expected."""
    exec_option = copy(vector.get_value('exec_option'))
    # Since we use the same random seed, the results should be returned in order.
    query = """select last_value(rand(2)) over (order by rand(2)) from
      functional.alltypestiny"""
    results = transpose_results(self.execute_query(query, exec_option).data,
        lambda x: float(x))
    assert (results == sorted(results))


class TestPartialSort(ImpalaTestSuite):
  """Test class to do functional validation of partial sorts."""

  @classmethod
  def get_workload(self):
    return 'tpch'

  @classmethod
  def add_test_dimensions(cls):
    super(TestPartialSort, cls).add_test_dimensions()
    add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)

    if cls.exploration_strategy() == 'core':
      cls.ImpalaTestMatrix.add_constraint(lambda v:
          v.get_value('table_format').file_format == 'parquet')

  def test_partial_sort_min_reservation(self, vector, unique_database):
    """Test that the partial sort node can operate if it only gets its minimum
    memory reservation."""
    table_name = "%s.kudu_test" % unique_database
    self.client.set_configuration_option(
        "debug_action", "-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0")
    self.execute_query("""create table %s (col0 string primary key)
        partition by hash(col0) partitions 8 stored as kudu""" % table_name)
    exec_option = copy(vector.get_value('exec_option'))
    result = self.execute_query(
        "insert into %s select string_col from functional.alltypessmall" % table_name,
        exec_option)
    assert "PARTIAL SORT" in result.runtime_profile, result.runtime_profile

  def test_partial_sort_kudu_insert(self, vector, unique_database):
    table_name = "%s.kudu_partial_sort_test" % unique_database
    self.execute_query("""create table %s (l_linenumber INT, l_orderkey BIGINT,
      l_partkey BIGINT, l_shipdate STRING, l_quantity DECIMAL(12,2),
      l_comment STRING, PRIMARY KEY(l_linenumber, l_orderkey) )
      PARTITION BY RANGE (l_linenumber)
      (
        PARTITION VALUE = 1,
        PARTITION VALUE = 2,
        PARTITION VALUE = 3,
        PARTITION VALUE = 4,
        PARTITION VALUE = 5,
        PARTITION VALUE = 6,
        PARTITION VALUE = 7
      )
      STORED AS KUDU""" % table_name)
    exec_option = copy(vector.get_value('exec_option'))
    result = self.execute_query(
        """insert into %s SELECT l_linenumber, l_orderkey, l_partkey, l_shipdate,
        l_quantity, l_comment FROM tpch.lineitem limit 300000""" % table_name,
        exec_option)
    assert "NumModifiedRows: 300000" in result.runtime_profile, result.runtime_profile
    assert "NumRowErrors: 0" in result.runtime_profile, result.runtime_profile


class TestArraySort(ImpalaTestSuite):
  """Tests where there are arrays in the sorting tuple.
  These tests run with num_nodes=1."""

  @classmethod
  def add_test_dimensions(cls):
    super(TestArraySort, cls).add_test_dimensions()
    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(cluster_sizes=[1]))
    add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)

    # The table we use is a parquet table.
    cls.ImpalaTestMatrix.add_constraint(lambda v:
        v.get_value('table_format').file_format == 'parquet')

  def test_array_sort(self, vector):
    self._run_test_sort_query(vector, None)

  def test_array_sort_with_limit(self, vector):
    self._run_test_sort_query(vector, 3000)

  def _run_test_sort_query(self, vector, limit):
    """Test sorting with spilling where an array that contains var-len data is in the
    sorting tuple. If 'limit' is set to an integer, applies that limit."""
    query = """select string_col, int_array, double_map, string_array, mixed
        from functional_parquet.arrays_big order by string_col"""

    if limit:
      assert isinstance(limit, int)
      limit_clause = " limit {}".format(limit)
      query = query + limit_clause

    exec_option = copy(vector.get_value('exec_option'))
    exec_option['disable_outermost_topn'] = 1
    exec_option['buffer_pool_limit'] = '44m'
    table_format = vector.get_value('table_format')

    query_result = self.execute_query(query, exec_option, table_format=table_format)
    # Check that spilling was successful.
    assert re.search(r'\s+\- SpilledRuns: [1-9]', query_result.runtime_profile)

    # Split result rows (strings) into columns.
    result = split_result_rows(query_result.data)
    # Sort the result rows according to the first column.
    sorted_result = sorted(result, key=lambda row: row[0])
    assert(result == sorted_result)
